import os
import copy
import time
import nltk
import torch
import wandb
import random
import spacy # conda install -c conda-forge spacy + python -m spacy download en_core_web_sm
import torchvision
import torchtext
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import torch.optim as optim
from PIL import Image
from tqdm import tqdm
from datetime import datetime
from spacy.symbols import ORTH
from collections import Counter
from torch.optim import lr_scheduler
from torch.utils.data import Dataset
# from sklearn.model_selection import KFold
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader
from nltk.translate.bleu_score import sentence_bleu
from torchvision import datasets, models, transforms
# from sklearn.model_selection import train_test_split
from nltk.translate.bleu_score import SmoothingFunction
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('device: ', device)
# os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
# sound
import time
import winsound
import datetime
device: cuda
Überlege Dir, welche Modell-Architektur Sinn machen könnte. Mindestens zwei Modell-Varianten sollen aufgebaut werden, die miteinander verglichen werden sollen.
def set_seed(seed=42):
random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(seed)
set_seed()
def play_sound(typ=0):
# play 'finish' sound
if typ==0:
winsound.PlaySound('../01_Dokumentation/win_sounds/beep.wav', winsound.SND_ASYNC)
if typ==1:
winsound.PlaySound('../01_Dokumentation/win_sounds/beep2.wav', winsound.SND_ASYNC)
images_folder = './data/Images'
captions_file = './data/captions.txt'
pd_captions = pd.read_csv('./data/captions.txt', sep='\t', header=None)
pd_captions.columns = ['full_caption']
pd_captions[['image_name', 'caption']] = pd_captions['full_caption'].str.split(',', n=1, expand=True)
pd_captions.to_csv('./data/pd_captions.csv', index=False)
pd_captions.drop('full_caption', axis=1, inplace=True)
pd_captions.head(10)
| image_name | caption | |
|---|---|---|
| 0 | 1000268201_693b08cb0e.jpg | A child in a pink dress is climbing up a set o... |
| 1 | 1000268201_693b08cb0e.jpg | A girl going into a wooden building . |
| 2 | 1000268201_693b08cb0e.jpg | A little girl climbing into a wooden playhouse . |
| 3 | 1000268201_693b08cb0e.jpg | A little girl climbing the stairs to her playh... |
| 4 | 1000268201_693b08cb0e.jpg | A little girl in a pink dress going into a woo... |
| 5 | 1001773457_577c3a7d70.jpg | A black dog and a spotted dog are fighting |
| 6 | 1001773457_577c3a7d70.jpg | A black dog and a tri-colored dog playing with... |
| 7 | 1001773457_577c3a7d70.jpg | A black dog and a white dog with brown spots a... |
| 8 | 1001773457_577c3a7d70.jpg | Two dogs of different breeds looking at each o... |
| 9 | 1001773457_577c3a7d70.jpg | Two dogs on pavement moving toward each other . |
Im caption.txt File ist der Bildnamen und die Bildbeschreibung (caption) hinterlegt. Pro Bild stehen fünf Captions zur Verfügung.
image_id = 5
example_image_path = f'{images_folder}/{pd_captions.image_name[image_id]}'
example_caption1 = pd_captions.caption[image_id+0]
example_caption2 = pd_captions.caption[image_id+1]
example_caption3 = pd_captions.caption[image_id+2]
example_caption4 = pd_captions.caption[image_id+3]
example_caption5 = pd_captions.caption[image_id+4]
image = Image.open(example_image_path)
plt.imshow(image)
plt.title(f'{example_caption1} \n {example_caption2} \n {example_caption3} \n {example_caption4} \n {example_caption5}')
plt.axis('off')
plt.show()
images_folder_path = './data/Images/'
resolutions = []
for image_filename in os.listdir(images_folder_path):
if image_filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
image_path = os.path.join(images_folder_path, image_filename)
with Image.open(image_path) as img:
resolutions.append(img.size)
dimension_labels = [f"{w}x{h}" for w, h in resolutions]
resolution_counts = Counter(dimension_labels)
sorted_resolution_counts = dict(resolution_counts.most_common(50))
plt.figure(figsize=(10, 3))
plt.bar(sorted_resolution_counts.keys(), sorted_resolution_counts.values(), color='skyblue')
plt.title('Verteilung der Bildauflösungen von Flickr 8k (Top 50)')
plt.xlabel('Dimension (b x w)')
plt.ylabel('Anzahl Bilder')
plt.xticks(rotation=90)
plt.show()
widths, heights = zip(*resolutions)
plt.figure(figsize=(10, 3))
plt.subplot(1, 2, 1)
plt.boxplot(widths, patch_artist=True, boxprops=dict(facecolor='grey', color='black'),
whiskerprops=dict(color='black'), capprops=dict(color='black'),
medianprops=dict(color='red'))
plt.title('Verteilung der Bildbreiten')
plt.ylabel('Pixel')
plt.xticks([1], ['b'])
plt.subplot(1, 2, 2)
plt.boxplot(heights, patch_artist=True, boxprops=dict(facecolor='grey', color='black'),
whiskerprops=dict(color='black'), capprops=dict(color='black'),
medianprops=dict(color='red'))
plt.title('Verteilung der Bildhöhen')
plt.ylabel('Pixel')
plt.xticks([1], ['h'])
plt.tight_layout()
plt.show()
Textdaten werden folgend in Tokens konvergiert: doku torchtext
# !pip show torchtext
# Version: 0.6.0
spacy_en = spacy.load('en_core_web_sm')
special_cases = [("<start>", [{ORTH: "<start>"}]), ("<end>", [{ORTH: "<end>"}]), ("<pad>", [{ORTH: "<pad>"}])]
for case in special_cases:
spacy_en.tokenizer.add_special_case(*case)
def tokenize_en(caption, lower_text=False):
if lower_text:
return [tok.text.lower() for tok in spacy_en.tokenizer(caption)]
else:
return [tok.text for tok in spacy_en.tokenizer(caption)]
print(f'Test tokenize: {example_caption1}')
tokens = tokenize_en(example_caption1)
tokens
Test tokenize: A black dog and a spotted dog are fighting
['A', 'black', 'dog', 'and', 'a', 'spotted', 'dog', 'are', 'fighting']
# liste mit allen vorhandenen Tokens
token_series = pd_captions['caption'].apply(tokenize_en).explode()
count_token = token_series.value_counts()
count_token.index.name = 'token'
count_token = count_token.reset_index()
# count_token
num_token = 50
plt.figure(figsize = (10, 4))
plt.bar(count_token.head(num_token).token, count_token.head(num_token).iloc[:,1], color='skyblue')
plt.title(f'Total Tokens: {len(count_token)} Tokens, dargestellt {num_token} Tokens')
plt.xticks(rotation=90)
plt.xlabel('Token')
plt.ylabel('Vorkommen')
plt.show()
def tokenize_en_len(caption, lower_text=False):
if lower_text:
return len([tok.text.lower() for tok in spacy_en.tokenizer(caption)])
else:
return len([tok.text for tok in spacy_en.tokenizer(caption)])
token_series = pd_captions['caption'].apply(tokenize_en_len)
# token_series
plt.figure(figsize = (10, 3))
token_mean = token_series.mean()
token_std = token_series.std() / 2
plt.hist(token_series, color='skyblue', bins=40)
plt.axvline(token_mean, color='red', alpha=0.6, label=f'mean {token_mean:0.2f}')
plt.axvspan(token_mean-token_std, token_mean+token_std, color='grey', alpha=0.2, label=f'std {token_std:0.2f}')
plt.suptitle('Verteilung der Anzahl Tokens je Caption')
plt.title(f'min bei: {token_series.min()}, max bei: {token_series.max()}', fontsize=8)
plt.xlabel('Anzahl Tokens')
plt.ylabel('Vorkommen')
plt.legend()
plt.show()
start und end Token eingeführt, diese sollen dem Modell helfen zu erkennen, wann der Beginn der Generierung einer Beschreibung ist und wann sie beendet werden sollte.min_num_token = 5
count_token_filtered = count_token[count_token['count'] >= min_num_token]
print(f'Total Tokens {len(count_token)}')
print(f'Anzahl Tokens die mehr als {min_num_token} vorkommen: {len(count_token_filtered)}')
Total Tokens 9209 Anzahl Tokens die mehr als 5 vorkommen: 3096
START_TOKEN = "<start>"
END_TOKEN = "<end>"
pd_caption_mod = pd_captions.copy()
pd_caption_mod['caption'] = pd_caption_mod['caption'].apply(lambda x: f"{START_TOKEN} {x} {END_TOKEN}")
pd_captions.to_csv('./data/pd_captions_mod.csv', index=False)
print(pd_captions['caption'][0])
print(pd_captions['caption'][1])
print(pd_captions['caption'][2])
print()
print(pd_caption_mod['caption'][0])
print(pd_caption_mod['caption'][1])
print(pd_caption_mod['caption'][2])
A child in a pink dress is climbing up a set of stairs in an entry way . A girl going into a wooden building . A little girl climbing into a wooden playhouse . <start> A child in a pink dress is climbing up a set of stairs in an entry way . <end> <start> A girl going into a wooden building . <end> <start> A little girl climbing into a wooden playhouse . <end>
print(f'Anzahl Captions: {len(pd_caption_mod)}')
unique_images = pd_caption_mod.image_name.unique()
print(f'Anzahl Bilder: {len(unique_images)}')
unique_images = list(pd_caption_mod.image_name.unique())
train_images = random.sample(unique_images, k=int(len(unique_images) * 0.7))
test_images = list(set(unique_images) - set(train_images))
print(f'Länge Trainingsset: {len(train_images)}')
print(f'Länge Testset: {len(test_images)}')
pd_train_set = pd_caption_mod[pd_caption_mod.image_name.isin(train_images)]
pd_test_set = pd_caption_mod[pd_caption_mod.image_name.isin(test_images)]
pd_train_set.to_csv('./data/train_captions.csv', index=False)
pd_test_set.to_csv('./data/test_captions.csv', index=False)
print(f'Länge Trainingsset: {len(pd_train_set)}')
print(f'Länge Testset: {len(pd_test_set)}')
Anzahl Captions: 40455 Anzahl Bilder: 8091 Länge Trainingsset: 5663 Länge Testset: 2428 Länge Trainingsset: 28315 Länge Testset: 12140
train_set = pd.read_csv('./data/train_captions.csv')
test_set = pd.read_csv('./data/test_captions.csv')
train_set.head(10)
| image_name | caption | |
|---|---|---|
| 0 | 1000268201_693b08cb0e.jpg | <start> A child in a pink dress is climbing up... |
| 1 | 1000268201_693b08cb0e.jpg | <start> A girl going into a wooden building . ... |
| 2 | 1000268201_693b08cb0e.jpg | <start> A little girl climbing into a wooden p... |
| 3 | 1000268201_693b08cb0e.jpg | <start> A little girl climbing the stairs to h... |
| 4 | 1000268201_693b08cb0e.jpg | <start> A little girl in a pink dress going in... |
| 5 | 1002674143_1b742ab4b8.jpg | <start> A little girl covered in paint sits in... |
| 6 | 1002674143_1b742ab4b8.jpg | <start> A little girl is sitting in front of a... |
| 7 | 1002674143_1b742ab4b8.jpg | <start> A small girl in the grass plays with f... |
| 8 | 1002674143_1b742ab4b8.jpg | <start> There is a girl with pigtails sitting ... |
| 9 | 1002674143_1b742ab4b8.jpg | <start> Young girl with pigtails painting outs... |
image_id = 5
example_image_path = f'{images_folder}/{train_set.image_name[image_id]}'
example_caption1 = train_set.caption[image_id+0]
example_caption2 = train_set.caption[image_id+1]
example_caption3 = train_set.caption[image_id+2]
example_caption4 = train_set.caption[image_id+3]
example_caption5 = train_set.caption[image_id+4]
image = Image.open(example_image_path)
plt.imshow(image)
plt.title(f'{example_caption1} \n {example_caption2} \n {example_caption3} \n {example_caption4} \n {example_caption5}')
plt.axis('off')
plt.show()
token_series = train_set['caption'].apply(tokenize_en_len)
print(f'maximale caption länge: {token_series.max()} Tokens')
maximale caption länge: 44 Tokens
def build_vocab(tokenized_captions, min_freq):
# Zähle die Häufigkeit der Tokens in allen Captions
token_counts = Counter(token for caption in tokenized_captions for token in caption)
# Erstelle das Vokabular nur mit Tokens, die min_freq oder mehr Mal vorkommen
vocab = {
"<pad>": 0,
"<start>": 1,
"<end>": 2
}
token_id = 3
for token, count in token_counts.items():
if count >= min_freq:
vocab[token] = token_id
token_id += 1
return vocab
def create_matrices_from_captions(train_set):
captions = train_set['caption']
tokenized_captions = [tokenize_en(caption, lower_text=True) for caption in captions]
# Erstellen des Vokabular mit einer Mindesthäufigkeit von x
vocab = build_vocab(tokenized_captions, min_freq=5)
# Vokabular verwenden, um Ihre Captions in Indizes umzuwandeln
indexed_captions = [[vocab.get(token, vocab["<pad>"]) for token in caption] for caption in tokenized_captions]
caption_tensors = [torch.tensor(caption) for caption in indexed_captions]
# Bestimmen der maximale Länge für das Padding
max_length = max(len(caption) for caption in caption_tensors)
# Padding hinzufügen, damit alle Captions die gleiche Länge haben
padded_captions = pad_sequence(caption_tensors, batch_first=True, padding_value=vocab["<pad>"])
return padded_captions, vocab
def indices_to_words(tensor_indices, vocab, rm_pad=True):
index_to_word = {index: word for word, index in vocab.items()}
words = [index_to_word.get(index.item(), '<unk>') for index in tensor_indices]
if rm_pad:
words = [word for word in words if word != '<pad>']
return words
padded_captions, vocab = create_matrices_from_captions(train_set)
print(f'Matrix dim: {padded_captions.shape}')
print(f'Länge Wörterbuch: {len(vocab)}')
padded_captions[:2]
Matrix dim: torch.Size([28315, 44]) Länge Wörterbuch: 2484
tensor([[ 3, 4, 5, 6, 4, 7, 8, 9, 10, 11, 4, 12, 13, 14, 6, 15, 0, 16,
17, 18, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0],
[ 3, 4, 19, 20, 21, 4, 22, 23, 17, 18, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0]])
# from torchtext.vocab import GloVe
# glove = GloVe(name='6B', dim=300)
# glove_embeddings = glove.vectors
# print(glove_embeddings.shape)
padded_captions, vocab = create_matrices_from_captions(train_set)
def load_glove_embeddings(path='./data/glove.6B.300d.txt'):
with open(path, 'r', encoding='utf-8') as f:
vocab = {}
for line in f.readlines():
values = line.split()
word = values[0]
vector = torch.tensor([float(val) for val in values[1:]], dtype=torch.float)
vocab[word] = vector
return vocab
glove_embeddings = load_glove_embeddings()
def create_embedding_matrix(vocab, glove_embeddings, embedding_dim = 300):
embedding_matrix = torch.zeros((len(vocab)+2, embedding_dim))
for word, idx in vocab.items():
if word in glove_embeddings:
embedding_matrix[idx] = glove_embeddings[word]
else:
embedding_matrix[idx] = torch.randn(embedding_dim) # Zufälliger Vektor für Wörter, die nicht in GloVe sind
return embedding_matrix
embedding_matrix = create_embedding_matrix(vocab, glove_embeddings)
embedding_matrix.shape
torch.Size([2486, 300])
class FlickrDataset(Dataset):
def __init__(self, csv_file_name, root_dir, vocab, embedding_matrix, transform=None):
self.captions_frame = pd.read_csv(csv_file_name)
self.root_dir = root_dir
self.vocab = vocab
self.embedding_matrix = embedding_matrix
self.transform = transform
def __len__(self):
return len(self.captions_frame)
def __getitem__(self, idx):
img_name = os.path.join(self.root_dir, self.captions_frame.iloc[idx, 0])
image = Image.open(img_name)
caption = self.captions_frame.iloc[idx,1]
image_name = self.captions_frame.iloc[idx,0]
tokenized_caption = tokenize_en(caption, lower_text=True)
caption_indices = [self.vocab.get(token, self.vocab['<pad>']) for token in tokenized_caption]
# Umwandeln der Liste von Indizes in einen Tensor
caption_indices_tensor = torch.tensor(caption_indices, dtype=torch.long)
# Extrahieren der Embeddings für die Indizes
caption_embeddings = torch.stack([self.embedding_matrix[idx] for idx in caption_indices])
if self.transform:
image = self.transform(image)
return image, caption, caption_indices_tensor, caption_embeddings, image_name
def collate_fn(self, batch):
images, caption, caption_indices, caption_embeddings, image_name = zip(*batch)
# Pad die caption_indices und caption_embeddings
caption_indices_padded = pad_sequence(caption_indices, batch_first=True, padding_value=self.vocab['<pad>'])
caption_embeddings_padded = pad_sequence(caption_embeddings, batch_first=True, padding_value=self.vocab['<pad>'])
images = torch.stack(images) # Stapeln der Bilder zu einem Tensor
return images, caption, caption_indices_padded, caption_embeddings_padded, image_name
train_set = FlickrDataset(
csv_file_name='./data/train_captions.csv',
root_dir='./data/Images',
vocab=vocab,
embedding_matrix=embedding_matrix,
transform=None
)
# Testen des Datensets
image, caption, caption_indices, caption_embeddings, image_name = train_set[0]
plt.figure(figsize = (10, 5))
plt.imshow(image)
plt.suptitle(f'{caption}', fontsize=10)
plt.title(f'Länge caption indices: {len(caption_indices)}, Länge caption_embeddings: {len(caption_embeddings)}', fontsize=8)
plt.axis('off')
plt.show()
normalize = transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
denormalize = transforms.Normalize(
mean=[-m / s for m, s in zip([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])],
std=[1 / s for s in [0.229, 0.224, 0.225]]
)
transformations = transforms.Compose([
transforms.Resize((256, 256)),
transforms.RandomCrop(224),
transforms.ToTensor(),
normalize
])
train_set = FlickrDataset(
csv_file_name='./data/train_captions.csv',
root_dir='./data/Images',
vocab=vocab,
embedding_matrix=embedding_matrix,
transform=transformations
)
test_set = FlickrDataset(
csv_file_name='./data/test_captions.csv',
root_dir='./data/Images',
vocab=vocab,
embedding_matrix=embedding_matrix,
transform=transformations
)
train_dataloader = DataLoader(train_set, batch_size=4, shuffle=True, collate_fn=train_set.collate_fn)
test_dataloader = DataLoader(test_set, batch_size=4, shuffle=False, collate_fn=test_set.collate_fn)
# Testen des Dataloader, prüfen ob die caption_empeddings pro batch gleich lang sind
for i_batch, (image, caption, caption_indices, caption_embeddings, image_name) in enumerate(train_dataloader):
print(f'Länge caption_indices 1: {len(caption_indices[0])}, Länge caption_embeddings 1: {len(caption_embeddings[0])}')
print(f'Länge caption_indices 2: {len(caption_indices[1])}, Länge caption_embeddings 2: {len(caption_embeddings[1])}')
print(f'Länge caption_indices 3: {len(caption_indices[2])}, Länge caption_embeddings 3: {len(caption_embeddings[2])}')
print(f'Länge caption_indices 4: {len(caption_indices[3])}, Länge caption_embeddings 4: {len(caption_embeddings[3])}')
print()
print('Example Image 1/4')
plt.figure(figsize = (10, 4))
img_tensor_denorm = denormalize(image[0])
img_pil = transforms.ToPILImage()(img_tensor_denorm)
plt.imshow(img_pil)
plt.suptitle(caption[0])
plt.title(f'wörter from caption_indices: {indices_to_words(caption_indices[0], vocab)}', fontsize=7)
plt.axis('off')
plt.show()
break
Länge caption_indices 1: 10, Länge caption_embeddings 1: 10 Länge caption_indices 2: 10, Länge caption_embeddings 2: 10 Länge caption_indices 3: 10, Länge caption_embeddings 3: 10 Länge caption_indices 4: 10, Länge caption_embeddings 4: 10 Example Image 1/4
Show and Tell¶class ImageCaptioningModel(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers, dropout_prob=0.5, glove_em=None):
super(ImageCaptioningModel, self).__init__()
self.vocab_size = vocab_size
# Laden des vortrainierten ResNet-50 ohne den letzten Layer
resnet = models.resnet50(weights=torchvision.models.ResNet50_Weights.IMAGENET1K_V2)
modules = list(resnet.children())[:-1]
self.resnet = nn.Sequential(*modules)
self.fc = nn.Linear(resnet.fc.in_features, embedding_dim)
# Einbettungs-Layer für die Captions
if glove_em is not None:
self.embedding = nn.Embedding.from_pretrained(glove_em, freeze=True)
print('using glove')
else:
self.embedding = nn.Embedding(vocab_size, embedding_dim)
# LSTM für die Caption-Generierung
self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True)
self.dropout = nn.Dropout(dropout_prob)
# Layer, um die Wort-Indizes vorherzusagen
self.linear = nn.Linear(hidden_dim, vocab_size)
self.batch_norm = nn.BatchNorm1d(embedding_dim)
def forward(self, images, captions):
# CNN-Teil
with torch.no_grad(): # Gradienten für ResNet nicht berechnen
features = self.resnet(images)
features = features.reshape(features.size(0), -1)
features = self.fc(features)
features = self.batch_norm(features)
features = self.dropout(features)
# Embedding und LSTM-Teil
captions_cut = captions.clone()
captions_cut[captions_cut>=self.vocab_size-1] = 0
embeddings = self.embedding(captions_cut)
embeddings = self.dropout(embeddings)
embeddings = torch.cat((features.unsqueeze(1), embeddings), 1)
hiddens, _ = self.lstm(embeddings)
outputs = self.linear(hiddens)
outputs_cut = outputs[:,:,0:self.vocab_size-1]
return outputs_cut
es gibt mehrere Varianten des BLEU-Scores (Bilingual Evaluation Understudy Score), die sich hauptsächlich in der Anzahl der betrachteten N-Gramme unterscheiden. Ein N-Gramm ist eine zusammenhängende Sequenz von N Items aus einer gegebenen Text- oder Sprachprobe. Der BLEU-Score kann basierend auf unigrammen (einzelnen Wörtern), bigrammen (Paaren von aufeinanderfolgenden Wörtern), trigrammen und so weiter berechnet werden. Häufig werden BLEU-Scores für verschiedene N-Gramme kombiniert, um eine Gesamtbewertung der Übersetzung zu geben. Dies wird oft als kumulativer BLEU-Score bezeichnet.
def remove_start_end_words(predicted_captions):
predicted_captions_trimmed = [caption.replace('<start> ', '') for caption in predicted_captions]
predicted_captions_trimmed = [caption.replace(' <end>', '') for caption in predicted_captions_trimmed]
return predicted_captions_trimmed
def calculate_bleu(true_caption, predicted_caption, n_gram=1):
if isinstance(true_caption, str):
true_caption = tokenize_en(true_caption)
if isinstance(predicted_caption, str):
predicted_caption = tokenize_en(predicted_caption)
smoothing = SmoothingFunction().method1
if n_gram == 1:
score = sentence_bleu([true_caption], predicted_caption, weights=(1,), smoothing_function=smoothing)
elif n_gram == 2:
score = sentence_bleu([true_caption], predicted_caption, weights=(0.5, 0.5), smoothing_function=smoothing)
elif n_gram == 3:
score = sentence_bleu([true_caption], predicted_caption, weights=(0.33, 0.33, 0.33), smoothing_function=smoothing)
elif n_gram == 4:
score = sentence_bleu([true_caption], predicted_caption, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=smoothing)
else:
raise ValueError("N-Gram-Wert außerhalb des gültigen Bereichs")
return score
def calculate_blue_batch(true_captions, predicted_caption, n_gram=1, get_mean=True):
blue_scores_n = [calculate_bleu(true_cap, cap_pred, n_gram) for (true_cap, cap_pred) in zip(true_captions, predicted_caption)]
if get_mean:
return np.mean(blue_scores_n)
else:
return blue_scores_n
true_caption = "Das ist ein Test."
predicted_caption = "Dies ist ein Test."
bleu_score = calculate_bleu(true_caption, predicted_caption, n_gram=4)
print('Blue Score Test:')
print(f"Bleu Score: {bleu_score}")
print()
predicted_caption = [
"<start> a brown and white dog is running on the grass . <end>",
"<start> a brown and white dog is running on the grass . <end>"
]
predicted_captions_trimmed = remove_start_end_words(predicted_caption)
print('Remove <start> und <end> von caption Test:')
print(f'caption: {predicted_caption[0]}')
print(f'caption trimmed: {predicted_captions_trimmed[0]}')
Blue Score Test: Bleu Score: 0.668740304976422 Remove <start> und <end> von caption Test: caption: <start> a brown and white dog is running on the grass . <end> caption trimmed: a brown and white dog is running on the grass .
def train_modell(config, model, dataloader, optimizer, criterion, epochs, device, test_batch=False):
set_seed(config['set_seed'])
model.to(device)
model.train()
epoch_losses = []
batch_losses = []
if config['write_wandb']:
model_name = f"{config['name']}-{config['epochs']}-epochs-{config['start_time']}"
wandb.init(
project="del-mc2",
entity='manuel-schwarz',
group=config['group'],
name= model_name,
tags=str(config['tags']) + (' is_test_batch' if config['is_test_batch'] else ''),
config=config
)
wandb.watch(model)
if config['use_gpu_memory_snapshot']:
MAX_NUM_OF_MEM_EVENTS_PER_SNAPSHOT = 100_000
torch.cuda.memory._record_memory_history(max_entries=MAX_NUM_OF_MEM_EVENTS_PER_SNAPSHOT)
for epoch in range(epochs):
ep_loss = []
loop = tqdm(enumerate(dataloader), total=len(dataloader), leave=False)
for i, (images, captions, caption_indices, glove_embeddings, image_name) in loop:
if test_batch and i > 1:
break
# if i < 13500: #error: 12613
# continue
# if config['use_glove_emb']:
# images, captions_emb2 = images.to(device), glove_embeddings.to(device)
# # captions_emb2 = captions_emb2.long()
# else:
# images, captions_emb = images.to(device), caption_indices.to(device)
images, captions_emb = images.to(device), caption_indices.to(device)
# outputs = model(images, captions_emb2[:, :-1]) # Exclude the <end> token
outputs = model(images, captions_emb[:, :-1]) # Exclude the <end> token
# targets = caption_indices[:, 1:].contiguous().view(-1) # Exclude the <start> token
targets = captions_emb[:, :].contiguous().view(-1)
targets_cut = targets.clone()
targets_cut[targets_cut >= config['vocab_size']-1] = 0
output_shaped = outputs.view(-1, outputs.size(-1))
# output_shaped_cpu = output_shaped.cpu()
# loss = criterion(outputs.view(-1, outputs.size(-1)), targets)
loss = criterion(output_shaped, targets_cut)
# batch_losses.append(loss.item())
ep_loss.append(loss.item())
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
if config['use_gpu_memory_snapshot'] and epoch == 5:
file_name = f'{config["start_time"]}_epoch_{epoch}_gpu_snapshot'
save_path = f'./gpu_snapshot/'
try:
torch.cuda.memory._dump_snapshot(f"{save_path}{file_name}.pickle")
except Exception as e:
print(f"Failed to capture memory snapshot {e}")
epoch_losses.append(np.mean(ep_loss))
# print(f'Epoch {epoch} Loss: {np.mean(ep_loss)}')
if config['write_wandb']:
wandb.log({
"train loss epoch": np.mean(ep_loss)
})
if config['write_wandb']:
wandb.finish()
time.sleep(5) # wait for wandb.finish
if config['use_gpu_memory_snapshot']:
torch.cuda.memory._record_memory_history(enabled=None)
print('Modell finished!')
play_sound(1)
return epoch_losses, batch_losses
def plot_loss_model(epoch_losses, name='-'):
plt.figure(figsize=(10,5))
plt.plot(epoch_losses, marker='o', color='skyblue', label='Training loss per epoch')
plt.title(f'Training Loss per Epoch (model: {name})')
plt.xlabel('Epoche')
plt.ylabel('Loss')
# plt.legend()
plt.grid(True)
plt.show()
def save_model(model, config):
path = f'./models/{config["start_time"]}_{config["name"]}_epochs_{config["epochs"]}.pth'
torch.save(model, path)
def load_model(model_name, device= torch.device('cuda' if torch.cuda.is_available() else 'cpu')):
path = f'./models/{model_name}.pth'
model = torch.load(path)
return model
from datetime import datetime
# Hyperparameters
config = {
"name": "CNN_LSTM_name", # CNN_LSTM_glove
"epochs": 100,
"train_batch_size": 128,
"test_batch_size": 64,
"dataset": "flickr8k",
"lr": 0.1,
"optimizer": 'SGD',
"loss_func": 'CrossEntropyLoss',
"image_size": 256,
"is_test_batch": False,
"start_time": datetime.now().strftime("%d.%m.%Y_%H%M"),
"num_workers": 0,
"dropout": 0.5,
"set_seed": 42,
'vocab_size': len(vocab)+1,
'embedding_dim': 300,
'hidden_dim': 512,
'num_layers': 1,
'write_wandb':True,
'group': 'first model',
'tags': 'training',
'use_glove_emb': False,
'save_model': True,
'device': str(torch.device('cuda' if torch.cuda.is_available() else 'cpu')),
'use_gpu_memory_snapshot': False # Achtung Beispiel code: snapshot nur für Linux verfügbar!
}
train_dataloader = DataLoader(
train_set,
batch_size=config['train_batch_size'],
shuffle=True,
collate_fn=train_set.collate_fn
)
test_dataloader = DataLoader(
test_set,
batch_size=config['test_batch_size'],
shuffle=False,
collate_fn=test_set.collate_fn
)
model1 = ImageCaptioningModel(
vocab_size = config['vocab_size'],
embedding_dim = config['embedding_dim'],
hidden_dim = config['hidden_dim'],
num_layers = config['num_layers'],
dropout_prob= config['dropout'],
glove_em = embedding_matrix if config['use_glove_emb'] else None
)
# model_name = '03.01.2024_1811_CNN_LSTM_glove_b_epochs_140'
# model1 = load_model(model_name)
# optimizer = optim.Adam(model1.parameters(), lr=0.001)
optimizer = torch.optim.SGD(model1.parameters(), lr=config['lr'])
criterion = nn.CrossEntropyLoss()
if False:
epoch_losses, batch_losses = train_modell(
config,
model1,
train_dataloader,
optimizer,
criterion,
epochs=config['epochs'],
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu'),
# device='cpu',
test_batch=config['is_test_batch']
)
if config['save_model']:
save_model(model1, config)
print('Modell saved!')
plot_loss_model(epoch_losses, 'model1')
def batch_indices_to_sentences(batch_indices, vocab, rm_pad=True):
sentences = []
for tensor_indices in batch_indices:
words = indices_to_words(tensor_indices, vocab, rm_pad)
sentence = ' '.join(words)
sentences.append(sentence)
return sentences
def model_output_to_caption(model_prediction, vocab):
_, pred_indices = torch.max(model_prediction, dim=-1)
pred_captions = batch_indices_to_sentences(pred_indices, vocab)
return pred_captions
def plot_test_images(images, caption_pred, image_name, plot_num_img=4):
if plot_num_img > len(image_name):
plot_num_img = len(image_name)
pd_data = pd.read_csv('./data/pd_captions.csv')
for i in range(0, 5 * plot_num_img, 5):
pd_captions = pd_data[pd_data.image_name == image_name[i]].caption.reset_index(drop=True)
cap1, cap2, cap3, cap4, cap5 = pd_captions[0], pd_captions[1], pd_captions[2], pd_captions[3], pd_captions[4]
plt.figure(figsize = (10, 4))
img_tensor_denorm = denormalize(images[i])
img_pil = transforms.ToPILImage()(img_tensor_denorm)
plt.imshow(img_pil)
plt.suptitle(f'pred: {caption_pred[i]}', color='red', y=1.15)
plt.title(f'\n true:{cap1} \n true:{cap2} \n true:{cap3} \n true:{cap4} \n true:{cap5}', color='green', fontsize=9)
plt.axis('off')
plt.show()
device= torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# model_name = '31.12.2023_1632_CNN_LSTM_epochs_30'
# model_name = '31.12.2023_1849_CNN_LSTM_b_epochs_30'
# model_name = '31.12.2023_2107_CNN_LSTM_c_epochs_90'
# model_name = '01.01.2024_0842_CNN_LSTM_d_epochs_30'
# model_name = '01.01.2024_1104_CNN_LSTM_e_epochs_30'
model_name = '01.01.2024_1341_CNN_LSTM_f_epochs_30'
model1_loaded = load_model(model_name)
model1_loaded.eval()
for i, (images, captions, caption_indices, glove_embeddings, image_name) in enumerate(test_dataloader):
with torch.no_grad():
images, captions_emb = images.to(device), caption_indices.to(device)
pred = model1_loaded(images, captions_emb[:, :-1])
caption_pred = model_output_to_caption(pred, vocab)
plot_test_images(images, caption_pred, image_name, plot_num_img=4)
blue_scores_n1 = calculate_blue_batch(captions, caption_pred, 1)
blue_scores_n2 = calculate_blue_batch(captions, caption_pred, 2)
blue_scores_n3 = calculate_blue_batch(captions, caption_pred, 3)
blue_scores_n4 = calculate_blue_batch(captions, caption_pred, 4)
break
# caption_pred
device= torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# model_glove_name = '01.01.2024_2227_CNN_LSTM_glove_a_epochs_100'
# model_glove_name = '03.01.2024_1811_CNN_LSTM_glove_b_epochs_140'
model_glove_name = '04.01.2024_1846_CNN_LSTM_glove_c_epochs_100'
model1_glove_loaded = load_model(model_glove_name)
model1_glove_loaded.eval()
for i, (images, captions, caption_indices, glove_embeddings, image_name) in enumerate(test_dataloader):
with torch.no_grad():
images, captions_emb = images.to(device), caption_indices.to(device)
pred = model1_glove_loaded(images, captions_emb[:, :-1])
caption_pred = model_output_to_caption(pred, vocab)
plot_test_images(images, caption_pred, image_name, plot_num_img=4)
blue_scores_n1 = calculate_blue_batch(captions, caption_pred, 1)
blue_scores_n2 = calculate_blue_batch(captions, caption_pred, 2)
blue_scores_n3 = calculate_blue_batch(captions, caption_pred, 3)
blue_scores_n4 = calculate_blue_batch(captions, caption_pred, 4)
break
def model_predictions_blue_scores(model, testloader, trim_captions=True):
'''
loads the modell, makes predictions and returns the mean blue score for 1-4 gram
param:
trim_captions: removes the added <start> and <end> word in captions
'''
blue_scores_n1_full = []
blue_scores_n2_full = []
blue_scores_n3_full = []
blue_scores_n4_full = []
model.eval()
for i, (images, captions, caption_indices, glove_embeddings, image_name) in enumerate(testloader):
with torch.no_grad():
images, captions_emb = images.to(device), caption_indices.to(device)
pred = model(images, captions_emb[:, :-1])
caption_pred = model_output_to_caption(pred, vocab)
if trim_captions:
caption_pred = remove_start_end_words(caption_pred)
captions = remove_start_end_words(captions)
blue_scores_n1_full.append(calculate_blue_batch(captions, caption_pred, 1))
blue_scores_n2_full.append(calculate_blue_batch(captions, caption_pred, 2))
blue_scores_n3_full.append(calculate_blue_batch(captions, caption_pred, 3))
blue_scores_n4_full.append(calculate_blue_batch(captions, caption_pred, 4))
mean_scores = [
np.mean(blue_scores_n1_full),
np.mean(blue_scores_n2_full),
np.mean(blue_scores_n3_full),
np.mean(blue_scores_n4_full)
]
return mean_scores
def plot_model_blue_score(mean_scores, title='title', figsize=(8, 4)):
labels = ['Blue 1-gram', 'Blue 2-gram', 'Blue 3-gram', 'Blue 4-gram']
fig, ax = plt.subplots(figsize=figsize)
ax.bar(labels, mean_scores, color='skyblue', zorder=3)
for i in range(len(labels)):
ax.text(i, mean_scores[i], f'{mean_scores[i]:.2f}',
ha='center', va='top', color='white')
plt.suptitle('Übersicht Model Blue Score zu verschieden n-grams')
plt.title(f'{title}', fontsize=9)
plt.xlabel('')
plt.ylabel('Blue score')
plt.xticks(rotation=90)
plt.yticks(np.arange(0,0.6,0.1))
plt.grid(axis='y', color='silver', zorder=3)
plt.show()
def plot_model_blue_score_diff(mean_scores:list, mean_scores_glove:list, epochs=0, figsize=(10, 6)):
labels = ['Blue 1-gram', 'Blue 2-gram', 'Blue 3-gram', 'Blue 4-gram']
bar_width = 0.3
fig, ax = plt.subplots(figsize=figsize)
bars1 = ax.bar(np.arange(len(labels)), mean_scores, bar_width, label='Eigene Embeddings', color='dodgerblue', zorder=3)
bars2 = ax.bar(np.arange(len(labels)) + bar_width, mean_scores_glove, bar_width, label='Glove Embeddings', color='orange', zorder=3)
for i in range(len(labels)):
ax.text(i, mean_scores[i], f'{mean_scores[i]:.3f}',
ha='center', va='top', color='white')
ax.text(i + bar_width, mean_scores_glove[i], f'{mean_scores_glove[i]:.3f}',
ha='center', va='top', color='white')
ax.set_xticks(np.arange(len(labels)) + bar_width / 2)
ax.set_xticklabels(labels)
ax.set_yticks(np.arange(0,0.6,0.1))
ax.set_ylabel('Blue Score')
ax.set_xlabel('')
ax.set_title(f'Vergleich der Modelle, ohne und mit Glove Embeddings (Epochs {epochs})')
ax.legend()
ax.grid(axis='y')
plt.show()
model_name_epoch_240 = '01.01.2024_1341_CNN_LSTM_f_epochs_30'
model_loaded_epoch_240 = load_model(model_name_epoch_240)
mean_scores = model_predictions_blue_scores(model_loaded_epoch_240, test_dataloader)
# plot_model_blue_score(mean_scores, title='Eigene Wortembedings')
model_glove_name_epoch_240 = '04.01.2024_1846_CNN_LSTM_glove_c_epochs_100'
model_glove_loaded_epoch_240 = load_model(model_glove_name_epoch_240)
mean_scores_glove = model_predictions_blue_scores(model_glove_loaded_epoch_240, test_dataloader)
# plot_model_blue_score(mean_scores_glove, title='Glove Wortembedings')
plot_model_blue_score_diff(mean_scores, mean_scores_glove, epochs=240)
model_name_epoch_150 = '31.12.2023_2107_CNN_LSTM_c_epochs_90'
model_loaded_epoch_150 = load_model(model_name_epoch_150)
mean_scores_150 = model_predictions_blue_scores(model_loaded_epoch_150, test_dataloader)
# Glove Model
model_glove_name_epoch_140 = '03.01.2024_1811_CNN_LSTM_glove_b_epochs_140'
model_glove_loaded_epoch_140 = load_model(model_glove_name_epoch_140)
mean_scores_glove_140 = model_predictions_blue_scores(model_glove_loaded_epoch_140, test_dataloader)
# plot Modell Differenz
plot_model_blue_score_diff(mean_scores_150, mean_scores_glove_140, epochs=140)
def get_best_model_predictions(model, testloader, trim_captions=True):
'''
loads the modell, makes predictions and returns the mean blue score for 1-4 gram
param:
trim_captions: removes the added <start> and <end> word in captions
'''
model.eval()
best_blue_4_gram_score = 0
for i, (images, captions, caption_indices, glove_embeddings, image_name) in enumerate(testloader):
with torch.no_grad():
images, captions_emb = images.to(device), caption_indices.to(device)
pred = model(images, captions_emb[:, :-1])
caption_pred = model_output_to_caption(pred, vocab)
if trim_captions:
caption_pred = remove_start_end_words(caption_pred)
captions = remove_start_end_words(captions)
blue_scores_n4 = calculate_blue_batch(captions, caption_pred, 4)
if blue_scores_n4 > best_blue_4_gram_score:
print(f'best blue score 4 gram: {best_blue_4_gram_score:0.4f} on {image_name}')
best_blue_4_gram_score = blue_scores_n4
best_image = images
best_caption_pred = caption_pred
best_true_caption = captions
best_image_name = image_name
return best_image, best_image_name, best_true_caption, best_caption_pred, best_blue_4_gram_score
test_dataloader_bt_1 = DataLoader(
test_set,
batch_size=1,
shuffle=False,
collate_fn=test_set.collate_fn
)
model_name_epoch_240 = '01.01.2024_1341_CNN_LSTM_f_epochs_30'
model_loaded_epoch_240 = load_model(model_name_epoch_240)
image, image_name, true_caption, caption_pred, score = get_best_model_predictions(
model_loaded_epoch_240,
test_dataloader_bt_1
)
pd_data = pd.read_csv('./data/pd_captions.csv')
pd_captions = pd_data[pd_data.image_name == image_name[0]].caption.reset_index(drop=True)
cap1, cap2, cap3, cap4, cap5 = pd_captions[0], pd_captions[1], pd_captions[2], pd_captions[3], pd_captions[4]
plt.figure(figsize = (10, 4))
img_tensor_denorm = denormalize(image[0])
img_pil = transforms.ToPILImage()(img_tensor_denorm)
plt.imshow(img_pil)
plt.suptitle(f'best pred: {caption_pred[0]}', color='red', y=1.15)
plt.title(f'\n true:{cap1} \n true:{cap2} \n true:{cap3} \n true:{cap4} \n true:{cap5}', color='green', fontsize=9)
plt.axis('off')
plt.show()
best blue score 4 gram: 0.0000 on ('1001773457_577c3a7d70.jpg',)
best blue score 4 gram: 0.1313 on ('1001773457_577c3a7d70.jpg',)
best blue score 4 gram: 0.1594 on ('1009434119_febe49276a.jpg',)
best blue score 4 gram: 0.2480 on ('1009434119_febe49276a.jpg',)
best blue score 4 gram: 0.5193 on ('101654506_8eb26cfb60.jpg',)
best blue score 4 gram: 0.6787 on ('1397887419_e798697b93.jpg',)
best blue score 4 gram: 0.7260 on ('2208631481_3e4a5675e1.jpg',)
best blue score 4 gram: 0.7598 on ('2428751994_88a6808246.jpg',)
best blue score 4 gram: 0.7861 on ('2698119128_62b4741043.jpg',)
best blue score 4 gram: 0.8409 on ('733752482_ee01a419e5.jpg',)
ende - Mini Challenge 2